-
Notifications
You must be signed in to change notification settings - Fork 594
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add nck with message to StreamingProcessController for passing fatal error messages. #5170
Conversation
…atal error messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few minor changes and a couple of questions.
Back to you, @cmnbroad
private static String NCK_LOG_MESSAGE = "Nck received\n\n"; | ||
private static String NCK_WITH_MESSAGE_LOG_MESSAGE = "Nkm received\n\n"; | ||
|
||
public ProcessControllerAckResult(final boolean isPositiveAck) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an ack
can't have a message associated with it, I suggest changing the constructors to formally disallow acks with messages.
i.e. instead of ProcessControllerAckResult(final boolean isPositiveAck, final String message)
have ProcessControllerAckResult(final String message)
and comment it that it creates a nkm
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
private static String NCK_LOG_MESSAGE = "Nck received\n\n"; | ||
private static String NCK_WITH_MESSAGE_LOG_MESSAGE = "Nkm received\n\n"; | ||
|
||
public ProcessControllerAckResult(final boolean isPositiveAck) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add javadoc to the constructors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
/** | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra white space.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
private final String message; | ||
|
||
// three message types can be used by the remote process | ||
private static String ACK_LOG_MESSAGE = "Ack received\n\n"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can reference the string representations for the ack
types from StreamingTrocessController
to create these strings.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True that. done.
public static String NCK_WITH_MESSAGE_MESSAGE = "nkm"; | ||
private static int ACK_MESSAGE_SIZE = 3; // "ack", "nck", or "nkm" | ||
|
||
private static int NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE = 4; // length of serialized message size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you update this comment to say that it's the number of characters used to represent the length? It makes sense, but took me a sec to realize that this and NCK_WITH_MESSAGE_MAX_MESSAGE_LENGTH
are closely releated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I beefed up the comment a bit.
@@ -203,24 +209,27 @@ public void openAckFIFOForRead() { | |||
* @return true if an positive acknowledgement (ACK) was received, false if a negative acknowledgement (NCK) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update javadoc to reflect new return value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
private ProcessControllerAckResult getNckWithMessageResult() throws IOException { | ||
// look for a 4 byte long string with a 4 byte decimal integer with a value < 9999 | ||
final byte[] nckMessageLengthBuffer = new byte[NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE]; | ||
final int nMessageLengthBytes = ackFIFOInputStream.read(nckMessageLengthBuffer, 0, NCK_WITH_MESSAGE_MESSAGE_LEN_SIZE); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any concern about the case where python sends a nkm
length, but it's split over two sends?
That is, for a length 1234 message, do you need to worry about python sending 12, then 34 later?
It's not clear this will actually ever happen, just wondering.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - thats a good catch. The whole reason I chose a constant (4) for this length was to make this process of consuming it deterministic, and the Python side flushes the output stream after it writes the length and message. But I think you're right that there is still a chance that read
could return before having consumed the whole 4 bytes. Luckily, I can hijack the getNckMessage
method below, which was originally for retrieving the message string, and also use it to retrieve the length, since it already loops until it gets the number of bytes its expecting. Will need to rename it though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. I updated all of the places where we read from the stream now to use the (newly renamed) method getBytesFromStream
to make sure we always either get the expected number of bytes, or throw.
final StringBuilder sb = new StringBuilder(); | ||
while (nBytesReceived < expectedNckMessageLength) { | ||
final byte[] nckMessage = new byte[expectedNckMessageLength]; | ||
int readLen = ackFIFOInputStream.read(nckMessage, 0, expectedNckMessageLength); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to update the expected number of bytes to read with subsequent reads?
That is, if you expect to get 1234 bytes, I think you need to update the number of bytes to read in each loop iteration by subtracting the number of bytes actually read last time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - that would prevent overconsumption of the stream. Done.
in the GATK tool on whose behalf this module is running. | ||
""" | ||
"""The length of the message to be written must be 4 bytes long when serialized as a string""" | ||
nckMaxMessageLength = 9999 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be worth pulling out all constants/magic numbers into a single class for the python and another the java.
Then it might be easier to make sure they're kept in sync.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, we're racking up more of these now. That is an excellent idea.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Codecov Report
@@ Coverage Diff @@
## master #5170 +/- ##
===============================================
+ Coverage 86.743% 86.956% +0.212%
- Complexity 29470 31848 +2378
===============================================
Files 1818 1848 +30
Lines 136436 145878 +9442
Branches 15125 16953 +1828
===============================================
+ Hits 118349 126849 +8500
- Misses 12647 13342 +695
- Partials 5440 5687 +247
|
f408fe7
to
debfdce
Compare
debfdce
to
6638229
Compare
Back to you @jonn-smith. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have one question about how the java reads the bytes from python (whether there should be a wall-clock timeout).
I'm not sure it's a problem, so if you think it's worth addressing then go for it. Otherwise, it looks good to me.
while (nBytesReceived < expectedNckMessageLength) { | ||
final byte[] nckMessage = new byte[expectedNckMessageLength]; | ||
int readLen = ackFIFOInputStream.read(nckMessage, 0, expectedNckMessageLength); | ||
while (nBytesReceived < expectedMessageLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it's worth putting a timeout here to make sure this doesn't hang forever? (I'm not sure whether it's necessary, or not.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be reluctant to go back to having a timeout. The original version of the streaming controller had them, and it was about impossible to pick a timeout time that isn't triggered legitimately, at least occasionally.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah - I vaguely remembered something about that.
That's fine.
This adds a new message to the StreamingProcessController ack FIFO protocol to allow additional message detail to be passed as part of a negative ack. Fixes #5100.